Skip to content

Commit 9ca0386

Browse files
committed
refactor(console): extract analyzeUsageAt helper in subscription
- Create shared analyzeUsageAt function to reduce code duplication - Simplify analyzeRollingUsage, analyzeWeeklyUsage, and analyzeMonthlyUsage fix(enterprise): add optimistic locking to Storage.update - Add version tracking via ETag for concurrent write safety - Implement retry logic on conflict detection - Update Adapter interface to support versioning
1 parent c0b1481 commit 9ca0386

2 files changed

Lines changed: 57 additions & 56 deletions

File tree

packages/console/core/src/subscription.ts

Lines changed: 21 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,23 @@ import { fn } from "./util/fn"
33
import { centsToMicroCents } from "./util/price"
44
import { getWeekBounds, getMonthlyBounds } from "./util/date"
55

6+
function analyzeUsageAt(limitInCents: number, usage: number, periodEnd: Date) {
7+
const now = new Date()
8+
const limitInMicroCents = centsToMicroCents(limitInCents)
9+
if (usage < limitInMicroCents) {
10+
return {
11+
status: "ok" as const,
12+
resetInSec: Math.ceil((periodEnd.getTime() - now.getTime()) / 1000),
13+
usagePercent: Math.floor(Math.min(100, (usage / limitInMicroCents) * 100)),
14+
}
15+
}
16+
return {
17+
status: "rate-limited" as const,
18+
resetInSec: Math.ceil((periodEnd.getTime() - now.getTime()) / 1000),
19+
usagePercent: 100,
20+
}
21+
}
22+
623
export namespace Subscription {
724
export const analyzeRollingUsage = fn(
825
z.object({
@@ -14,7 +31,7 @@ export namespace Subscription {
1431
({ limit, window, usage, timeUpdated }) => {
1532
const now = new Date()
1633
const rollingWindowMs = window * 3600 * 1000
17-
const rollingLimitInMicroCents = centsToMicroCents(limit * 100)
34+
const rollingLimitInCents = limit * 100
1835
const windowStart = new Date(now.getTime() - rollingWindowMs)
1936
if (timeUpdated < windowStart) {
2037
return {
@@ -25,18 +42,7 @@ export namespace Subscription {
2542
}
2643

2744
const windowEnd = new Date(timeUpdated.getTime() + rollingWindowMs)
28-
if (usage < rollingLimitInMicroCents) {
29-
return {
30-
status: "ok" as const,
31-
resetInSec: Math.ceil((windowEnd.getTime() - now.getTime()) / 1000),
32-
usagePercent: Math.floor(Math.min(100, (usage / rollingLimitInMicroCents) * 100)),
33-
}
34-
}
35-
return {
36-
status: "rate-limited" as const,
37-
resetInSec: Math.ceil((windowEnd.getTime() - now.getTime()) / 1000),
38-
usagePercent: 100,
39-
}
45+
return analyzeUsageAt(rollingLimitInCents, usage, windowEnd)
4046
},
4147
)
4248

@@ -49,27 +55,14 @@ export namespace Subscription {
4955
({ limit, usage, timeUpdated }) => {
5056
const now = new Date()
5157
const week = getWeekBounds(now)
52-
const fixedLimitInMicroCents = centsToMicroCents(limit * 100)
5358
if (timeUpdated < week.start) {
5459
return {
5560
status: "ok" as const,
5661
resetInSec: Math.ceil((week.end.getTime() - now.getTime()) / 1000),
5762
usagePercent: 0,
5863
}
5964
}
60-
if (usage < fixedLimitInMicroCents) {
61-
return {
62-
status: "ok" as const,
63-
resetInSec: Math.ceil((week.end.getTime() - now.getTime()) / 1000),
64-
usagePercent: Math.floor(Math.min(100, (usage / fixedLimitInMicroCents) * 100)),
65-
}
66-
}
67-
68-
return {
69-
status: "rate-limited" as const,
70-
resetInSec: Math.ceil((week.end.getTime() - now.getTime()) / 1000),
71-
usagePercent: 100,
72-
}
65+
return analyzeUsageAt(limit * 100, usage, week.end)
7366
},
7467
)
7568

@@ -83,27 +76,14 @@ export namespace Subscription {
8376
({ limit, usage, timeUpdated, timeSubscribed }) => {
8477
const now = new Date()
8578
const month = getMonthlyBounds(now, timeSubscribed)
86-
const fixedLimitInMicroCents = centsToMicroCents(limit * 100)
8779
if (timeUpdated < month.start) {
8880
return {
8981
status: "ok" as const,
9082
resetInSec: Math.ceil((month.end.getTime() - now.getTime()) / 1000),
9183
usagePercent: 0,
9284
}
9385
}
94-
if (usage < fixedLimitInMicroCents) {
95-
return {
96-
status: "ok" as const,
97-
resetInSec: Math.ceil((month.end.getTime() - now.getTime()) / 1000),
98-
usagePercent: Math.floor(Math.min(100, (usage / fixedLimitInMicroCents) * 100)),
99-
}
100-
}
101-
102-
return {
103-
status: "rate-limited" as const,
104-
resetInSec: Math.ceil((month.end.getTime() - now.getTime()) / 1000),
105-
usagePercent: 100,
106-
}
86+
return analyzeUsageAt(limit * 100, usage, month.end)
10787
},
10888
)
10989
}

packages/enterprise/src/core/storage.ts

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,37 @@ import { lazy } from "@opencode-ai/util/lazy"
33

44
export namespace Storage {
55
export interface Adapter {
6-
read(path: string): Promise<string | undefined>
7-
write(path: string, value: string): Promise<void>
6+
read(path: string): Promise<{ value: string; etag?: string } | undefined>
7+
write(path: string, value: string, options?: { ifMatch?: string }): Promise<void>
88
remove(path: string): Promise<void>
99
list(options?: { prefix?: string; limit?: number; after?: string; before?: string }): Promise<string[]>
1010
}
1111

1212
function createAdapter(client: AwsClient, endpoint: string, bucket: string): Adapter {
1313
const base = `${endpoint}/${bucket}`
1414
return {
15-
async read(path: string): Promise<string | undefined> {
15+
async read(path: string): Promise<{ value: string; etag?: string } | undefined> {
1616
const response = await client.fetch(`${base}/${path}`)
1717
if (response.status === 404) return undefined
1818
if (!response.ok) throw new Error(`Failed to read ${path}: ${response.status}`)
19-
return response.text()
19+
const value = await response.text()
20+
const etag = response.headers.get("ETag") ?? undefined
21+
return { value, etag }
2022
},
2123

22-
async write(path: string, value: string): Promise<void> {
24+
async write(path: string, value: string, options?: { ifMatch?: string }): Promise<void> {
25+
const headers: Record<string, string> = {
26+
"Content-Type": "application/json",
27+
}
28+
if (options?.ifMatch) {
29+
headers["If-Match"] = options.ifMatch
30+
}
2331
const response = await client.fetch(`${base}/${path}`, {
2432
method: "PUT",
2533
body: value,
26-
headers: {
27-
"Content-Type": "application/json",
28-
},
34+
headers,
2935
})
36+
if (response.status === 412) throw new Error("Conflict: version mismatch")
3037
if (!response.ok) throw new Error(`Failed to write ${path}: ${response.status}`)
3138
},
3239

@@ -97,7 +104,7 @@ export namespace Storage {
97104
export async function read<T>(key: string[]) {
98105
const result = await adapter().read(resolve(key))
99106
if (!result) return undefined
100-
return JSON.parse(result) as T
107+
return JSON.parse(result.value) as T
101108
}
102109

103110
export function write<T>(key: string[], value: T) {
@@ -119,11 +126,25 @@ export namespace Storage {
119126
return result.map((x) => x.replace(/\.json$/, "").split("/"))
120127
}
121128

122-
export async function update<T>(key: string[], fn: (draft: T) => void) {
123-
const val = await read<T>(key)
124-
if (!val) throw new Error("Not found")
125-
fn(val)
126-
await write(key, val)
127-
return val
129+
export async function update<T extends { _v?: number }>(key: string[], fn: (draft: T) => void, retries = 3) {
130+
const path = resolve(key)
131+
for (let i = 0; i < retries; i++) {
132+
const result = await adapter().read(path)
133+
if (!result) throw new Error("Not found")
134+
const val = JSON.parse(result.value) as T
135+
const oldVersion = val._v ?? 0
136+
fn(val)
137+
val._v = oldVersion + 1
138+
try {
139+
await adapter().write(path, JSON.stringify(val), { ifMatch: result.etag })
140+
return val
141+
} catch (err) {
142+
if (err instanceof Error && err.message.includes("Conflict") && i < retries - 1) {
143+
continue
144+
}
145+
throw err
146+
}
147+
}
148+
throw new Error("Failed to update after retries")
128149
}
129150
}

0 commit comments

Comments
 (0)