forked from anomalyco/opencode
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsync.ts
More file actions
118 lines (113 loc) · 3.53 KB
/
sync.ts
File metadata and controls
118 lines (113 loc) · 3.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import z from "zod"
import { Hono } from "hono"
import { describeRoute, validator, resolver } from "hono-openapi"
import { SyncEvent } from "@/sync"
import { Database, asc, and, not, or, lte, eq } from "@/storage/db"
import { EventTable } from "@/sync/event.sql"
import { lazy } from "@/util/lazy"
import { Log } from "@/util/log"
import { errors } from "../error"
const ReplayEvent = z.object({
id: z.string(),
aggregateID: z.string(),
seq: z.number().int().min(0),
type: z.string(),
data: z.record(z.string(), z.unknown()),
})
const log = Log.create({ service: "server.sync" })
export const SyncRoutes = lazy(() =>
new Hono()
.post(
"/replay",
describeRoute({
summary: "Replay sync events",
description: "Validate and replay a complete sync event history.",
operationId: "sync.replay",
responses: {
200: {
description: "Replayed sync events",
content: {
"application/json": {
schema: resolver(
z.object({
sessionID: z.string(),
}),
),
},
},
},
...errors(400),
},
}),
validator(
"json",
z.object({
directory: z.string(),
events: z.array(ReplayEvent).min(1),
}),
),
async (c) => {
const body = c.req.valid("json")
const events = body.events
const source = events[0].aggregateID
log.info("sync replay requested", {
sessionID: source,
events: events.length,
first: events[0]?.seq,
last: events.at(-1)?.seq,
directory: body.directory,
})
SyncEvent.replayAll(events)
log.info("sync replay complete", {
sessionID: source,
events: events.length,
first: events[0]?.seq,
last: events.at(-1)?.seq,
})
return c.json({
sessionID: source,
})
},
)
.get(
"/history",
describeRoute({
summary: "List sync events",
description:
"List sync events for all aggregates. Keys are aggregate IDs the client already knows about, values are the last known sequence ID. Events with seq > value are returned for those aggregates. Aggregates not listed in the input get their full history.",
operationId: "sync.history.list",
responses: {
200: {
description: "Sync events",
content: {
"application/json": {
schema: resolver(
z.array(
z.object({
id: z.string(),
aggregate_id: z.string(),
seq: z.number(),
type: z.string(),
data: z.record(z.string(), z.unknown()),
}),
),
),
},
},
},
...errors(400),
},
}),
validator("json", z.record(z.string(), z.number().int().min(0))),
async (c) => {
const body = c.req.valid("json")
const exclude = Object.entries(body)
const where =
exclude.length > 0
? not(or(...exclude.map(([id, seq]) => and(eq(EventTable.aggregate_id, id), lte(EventTable.seq, seq))))!)
: undefined
const rows = Database.use((db) => db.select().from(EventTable).where(where).orderBy(asc(EventTable.seq)).all())
return c.json(rows)
},
),
)