@@ -586,3 +676,41 @@ function SpanWithDuration({
);
}
+
+const edgeBoundary = 0.05;
+
+function CurrentTimeIndicator({ totalDuration }: { totalDuration: number }) {
+ return (
+
+ {(ms) => {
+ const ratio = ms / nanosecondsToMilliseconds(totalDuration);
+ let offset = 0.5;
+ if (ratio < edgeBoundary) {
+ offset = lerp(0, 0.5, ratio / edgeBoundary);
+ } else if (ratio > 1 - edgeBoundary) {
+ offset = lerp(0.5, 1, (ratio - (1 - edgeBoundary)) / edgeBoundary);
+ }
+
+ return (
+
+
+
+ {formatDurationMilliseconds(ms, {
+ style: "short",
+ maxDecimalPoints: ms < 1000 ? 0 : 1,
+ })}
+
+
+
+
+ );
+ }}
+
+ );
+}
diff --git a/apps/webapp/app/utils/lerp.ts b/apps/webapp/app/utils/lerp.ts
new file mode 100644
index 00000000000..c2df9a5d2d2
--- /dev/null
+++ b/apps/webapp/app/utils/lerp.ts
@@ -0,0 +1,15 @@
+/** Linearly interpolates between the min/max values, using t.
+ * It can't go outside the range */
+export function lerp(min: number, max: number, t: number) {
+ return min + (max - min) * clamp(t, 0, 1);
+}
+
+/** Inverse lerp */
+export function inverseLerp(min: number, max: number, value: number) {
+ return (value - min) / (max - min);
+}
+
+/** Clamps a value between a min and max */
+export function clamp(value: number, min: number, max: number) {
+ return Math.min(max, Math.max(min, value));
+}
diff --git a/apps/webapp/app/v3/eventRepository.server.ts b/apps/webapp/app/v3/eventRepository.server.ts
index 070e1b44a12..c7fd2cfd4ea 100644
--- a/apps/webapp/app/v3/eventRepository.server.ts
+++ b/apps/webapp/app/v3/eventRepository.server.ts
@@ -12,7 +12,6 @@ import {
flattenAndNormalizeAttributes,
flattenAttributes,
isExceptionSpanEvent,
- logger,
omit,
unflattenAttributes,
} from "@trigger.dev/core/v3";
@@ -21,6 +20,10 @@ import { createHash } from "node:crypto";
import { PrismaClient, prisma } from "~/db.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { DynamicFlushScheduler } from "./dynamicFlushScheduler.server";
+import Redis, { RedisOptions } from "ioredis";
+import { env } from "~/env.server";
+import { EventEmitter } from "node:stream";
+import { logger } from "~/services/logger.server";
export type CreatableEvent = Omit<
Prisma.TaskEventCreateInput,
@@ -77,6 +80,7 @@ export type EventBuilder = {
export type EventRepoConfig = {
batchSize: number;
batchInterval: number;
+ redis: RedisOptions;
};
export type QueryOptions = Prisma.TaskEventWhereInput;
@@ -119,8 +123,8 @@ export type UpdateEventOptions = {
export class EventRepository {
private readonly _flushScheduler: DynamicFlushScheduler
;
-
private _randomIdGenerator = new RandomIdGenerator();
+ private _redisPublishClient: Redis;
constructor(private db: PrismaClient = prisma, private readonly _config: EventRepoConfig) {
this._flushScheduler = new DynamicFlushScheduler({
@@ -128,6 +132,8 @@ export class EventRepository {
flushInterval: _config.batchInterval,
callback: this.#flushBatch.bind(this),
});
+
+ this._redisPublishClient = new Redis(this._config.redis);
}
async insert(event: CreatableEvent) {
@@ -138,6 +144,8 @@ export class EventRepository {
await this.db.taskEvent.create({
data: event as Prisma.TaskEventCreateInput,
});
+
+ this.#publishToRedis([event]);
}
async insertMany(events: CreatableEvent[]) {
@@ -576,12 +584,50 @@ export class EventRepository {
return result;
}
+ async subscribeToTrace(traceId: string) {
+ const redis = new Redis(this._config.redis);
+
+ const channel = `events:${traceId}:*`;
+
+ // Subscribe to the channel.
+ await redis.psubscribe(channel);
+
+ const eventEmitter = new EventEmitter();
+
+ // Define the message handler.
+ redis.on("pmessage", (pattern, channelReceived, message) => {
+ if (channelReceived.startsWith(`events:${traceId}:`)) {
+ eventEmitter.emit("message", message);
+ }
+ });
+
+ // Return a function that can be used to unsubscribe.
+ const unsubscribe = async () => {
+ await redis.punsubscribe(channel);
+ };
+
+ return {
+ unsubscribe,
+ eventEmitter,
+ };
+ }
+
async #flushBatch(batch: CreatableEvent[]) {
const events = excludePartialEventsWithCorrespondingFullEvent(batch);
await this.db.taskEvent.createMany({
data: events as Prisma.TaskEventCreateManyInput[],
});
+
+ this.#publishToRedis(events);
+ }
+
+ async #publishToRedis(events: CreatableEvent[]) {
+ if (events.length === 0) return;
+ const uniqueTraceSpans = new Set(events.map((e) => `events:${e.traceId}:${e.spanId}`));
+ for (const id of uniqueTraceSpans) {
+ await this._redisPublishClient.publish(id, new Date().toISOString());
+ }
}
public generateTraceId() {
@@ -614,6 +660,14 @@ export class EventRepository {
export const eventRepository = new EventRepository(prisma, {
batchSize: 100,
batchInterval: 5000,
+ redis: {
+ port: env.REDIS_PORT,
+ host: env.REDIS_HOST,
+ username: env.REDIS_USERNAME,
+ password: env.REDIS_PASSWORD,
+ enableAutoPipelining: true,
+ ...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
+ },
});
export function stripAttributePrefix(attributes: Attributes, prefix: string) {
diff --git a/packages/core/src/v3/index.ts b/packages/core/src/v3/index.ts
index d17ca66e2bc..d16422d1b83 100644
--- a/packages/core/src/v3/index.ts
+++ b/packages/core/src/v3/index.ts
@@ -16,6 +16,7 @@ export {
formatDurationNanoseconds,
formatDurationInDays,
nanosecondsToMilliseconds,
+ millisecondsToNanoseconds,
} from "./utils/durations";
export { getEnvVar } from "./utils/getEnv";
diff --git a/tests/e2e/e2e.spec.ts b/tests/e2e/e2e.spec.ts
index 684425c7dd4..8b765de98d9 100644
--- a/tests/e2e/e2e.spec.ts
+++ b/tests/e2e/e2e.spec.ts
@@ -31,13 +31,13 @@ test("Verify jobs from the test nextjs project", async ({ page }) => {
await page.locator("a").filter({ hasText: "Test Project" }).click();
await page.getByRole("link", { name: "Environments & API Keys" }).click();
- await expect(page.locator("h1").filter({ hasText: "Environments & API Keys" })).toBeVisible();
+ await expect(page.locator("h2").filter({ hasText: "Environments & API Keys" })).toBeVisible();
await expect(
page.locator("h3").filter({ hasText: "nextjs-test" })
// Set the timeout high to allow the cli to register jobs
).toBeVisible({ timeout: 15000 });
await page.getByRole("link", { name: "Jobs" }).click();
- await expect(page.locator("h1").filter({ hasText: /^Jobs$/ })).toBeVisible();
+ await expect(page.locator("h2").filter({ hasText: /^Jobs$/ })).toBeVisible();
await expect(page.getByRole("link", { name: /Test Job One/ })).toBeVisible();
});