Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions apps/sim/lib/copilot/request/session/buffer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ describe('mothership-stream-outbox', () => {
expect(replayed.map((entry) => entry.payload.text)).toEqual(['world'])
})

it('does not trim active stream history while appending events', async () => {
it('trims active stream history to eventLimit on every append', async () => {
const cursor = await allocateCursor('stream-1')

await appendEvent(
Expand All @@ -163,7 +163,11 @@ describe('mothership-stream-outbox', () => {
})
)

expect(mockRedis.zremrangebyrank).not.toHaveBeenCalled()
expect(mockRedis.zremrangebyrank).toHaveBeenCalledWith(
'mothership_stream:stream-1:events',
0,
-5_001
)
Comment thread
waleedlatif1 marked this conversation as resolved.
})

it('clears persisted stream state during teardown cleanup', async () => {
Expand Down
1 change: 1 addition & 0 deletions apps/sim/lib/copilot/request/session/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ export async function appendEvents(
zaddArgs.push(envelope.seq, JSON.stringify(envelope))
}
pipeline.zadd(key, ...(zaddArgs as [number, string, ...Array<number | string>]))
pipeline.zremrangebyrank(key, 0, -config.eventLimit - 1)
pipeline.expire(key, config.ttlSeconds)
pipeline.set(seqKey, String(envelopes[envelopes.length - 1].seq), 'EX', config.ttlSeconds)
await pipeline.exec()
Expand Down
20 changes: 19 additions & 1 deletion apps/sim/lib/core/idempotency/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ export interface IdempotencyConfig {
namespace?: string
/** When true, failed keys are deleted rather than stored so the operation is retried on the next attempt. */
retryFailures?: boolean
/**
* When false, only `{ success, status, error? }` is persisted — not the
* operation's return value. Duplicate calls still short-circuit but
* resolve to `undefined`. Use when callers don't consume the cached
* body (e.g. webhook receivers, where the provider just wants a 2xx).
* Defaults to true.
*/
storeResultBody?: boolean
/**
* Force a specific storage backend regardless of the environment's
* auto-detection. Use `'database'` for correctness-critical flows
Expand Down Expand Up @@ -77,6 +85,7 @@ export class IdempotencyService {
ttlSeconds: config.ttlSeconds ?? DEFAULT_TTL,
namespace: config.namespace ?? 'default',
retryFailures: config.retryFailures ?? false,
storeResultBody: config.storeResultBody ?? true,
}
this.storageMethod = config.forceStorage ?? getStorageMethod()
logger.info(`IdempotencyService using ${this.storageMethod} storage`, {
Expand Down Expand Up @@ -441,7 +450,9 @@ export class IdempotencyService {

await this.storeResult(
claimResult.normalizedKey,
{ success: true, result, status: 'completed' },
this.config.storeResultBody
? { success: true, result, status: 'completed' }
: { success: true, status: 'completed' },
claimResult.storageMethod
)

Expand Down Expand Up @@ -510,15 +521,22 @@ export class IdempotencyService {
}
}

/**
* As a webhook receiver we only need a "we saw this delivery" marker —
* the provider's retry just needs a 2xx, not our cached response body.
* TTL must exceed the longest provider retry window (Gmail / Pub-Sub: 7d).
*/
export const webhookIdempotency = new IdempotencyService({
namespace: 'webhook',
ttlSeconds: 60 * 60 * 24 * 7, // 7 days
storeResultBody: false,
})

export const pollingIdempotency = new IdempotencyService({
namespace: 'polling',
ttlSeconds: 60 * 60 * 24 * 3, // 3 days
retryFailures: true,
storeResultBody: false,
})

/**
Expand Down
Loading