-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathcache.go
More file actions
323 lines (275 loc) · 9.19 KB
/
Copy pathcache.go
File metadata and controls
323 lines (275 loc) · 9.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
package files
import (
"bytes"
"context"
"io/fs"
"sync"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/xerrors"
archivefs "github.com/coder/coder/v2/archive/fs"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/coderd/rbac/policy"
"github.com/coder/coder/v2/coderd/util/lazy"
)
type FileAcquirer interface {
Acquire(ctx context.Context, db database.Store, fileID uuid.UUID) (*CloseFS, error)
}
// New returns a file cache that will fetch files from a database
func New(registerer prometheus.Registerer, authz rbac.Authorizer) *Cache {
return &Cache{
lock: sync.Mutex{},
data: make(map[uuid.UUID]*cacheEntry),
authz: authz,
cacheMetrics: newCacheMetrics(registerer),
}
}
func newCacheMetrics(registerer prometheus.Registerer) cacheMetrics {
subsystem := "file_cache"
f := promauto.With(registerer)
return cacheMetrics{
currentCacheSize: f.NewGauge(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_files_size_bytes_current",
Help: "The current amount of memory of all files currently open in the file cache.",
}),
totalCacheSize: f.NewCounter(prometheus.CounterOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_files_size_bytes_total",
Help: "The total amount of memory ever opened in the file cache. This number never decrements.",
}),
currentOpenFiles: f.NewGauge(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_files_current",
Help: "The count of unique files currently open in the file cache.",
}),
totalOpenedFiles: f.NewCounter(prometheus.CounterOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_files_total",
Help: "The total count of unique files ever opened in the file cache.",
}),
currentOpenFileReferences: f.NewGauge(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_file_refs_current",
Help: "The count of file references currently open in the file cache. Multiple references can be held for the same file.",
}),
totalOpenFileReferences: f.NewCounterVec(prometheus.CounterOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_file_refs_total",
Help: "The total number of file references ever opened in the file cache. The 'hit' label indicates if the file was loaded from the cache.",
}, []string{"hit"}),
}
}
// Cache persists the files for template versions, and is used by dynamic
// parameters to deduplicate the files in memory. When any number of users opens
// the workspace creation form for a given template version, it's files are
// loaded into memory exactly once. We hold those files until there are no
// longer any open connections, and then we remove the value from the map.
type Cache struct {
lock sync.Mutex
data map[uuid.UUID]*cacheEntry
authz rbac.Authorizer
// metrics
cacheMetrics
}
type cacheMetrics struct {
currentOpenFileReferences prometheus.Gauge
totalOpenFileReferences *prometheus.CounterVec
currentOpenFiles prometheus.Gauge
totalOpenedFiles prometheus.Counter
currentCacheSize prometheus.Gauge
totalCacheSize prometheus.Counter
}
type cacheEntry struct {
// Safety: refCount must only be accessed while the Cache lock is held.
refCount int
value *lazy.ValueWithError[CacheEntryValue]
// Safety: close must only be called while the Cache lock is held
close func()
// Safety: purge must only be called while the Cache lock is held
purge func()
}
type CacheEntryValue struct {
fs.FS
Object rbac.Object
Size int64
}
var _ fs.FS = (*CloseFS)(nil)
// CloseFS is a wrapper around fs.FS that implements io.Closer. The Close()
// method tells the cache to release the fileID. Once all open references are
// closed, the file is removed from the cache.
type CloseFS struct {
fs.FS
close func()
}
func (f *CloseFS) Close() {
f.close()
}
// Acquire will load the fs.FS for the given file. It guarantees that parallel
// calls for the same fileID will only result in one fetch, and that parallel
// calls for distinct fileIDs will fetch in parallel.
//
// Safety: Every call to Acquire that does not return an error must call close
// on the returned value when it is done being used.
func (c *Cache) Acquire(ctx context.Context, db database.Store, fileID uuid.UUID) (*CloseFS, error) {
// It's important that this `Load` call occurs outside `prepare`, after the
// mutex has been released, or we would continue to hold the lock until the
// entire file has been fetched, which may be slow, and would prevent other
// files from being fetched in parallel.
e := c.prepare(db, fileID)
ev, err := e.value.Load()
if err != nil {
c.lock.Lock()
defer c.lock.Unlock()
e.close()
e.purge()
return nil, err
}
cleanup := func() {
c.lock.Lock()
defer c.lock.Unlock()
e.close()
}
// We always run the fetch under a system context and actor, so we need to
// check the caller's context (including the actor) manually before returning.
// Check if the caller's context was canceled. Even though `Authorize` takes
// a context, we still check it manually first because none of our mock
// database implementations check for context cancellation.
if err := ctx.Err(); err != nil {
cleanup()
return nil, err
}
// Check that the caller is authorized to access the file
subject, ok := dbauthz.ActorFromContext(ctx)
if !ok {
cleanup()
return nil, dbauthz.ErrNoActor
}
if err := c.authz.Authorize(ctx, subject, policy.ActionRead, ev.Object); err != nil {
cleanup()
return nil, err
}
var closeOnce sync.Once
return &CloseFS{
FS: ev.FS,
close: func() {
// sync.Once makes the Close() idempotent, so we can call it
// multiple times without worrying about double-releasing.
closeOnce.Do(func() {
c.lock.Lock()
defer c.lock.Unlock()
e.close()
})
},
}, nil
}
func (c *Cache) prepare(db database.Store, fileID uuid.UUID) *cacheEntry {
c.lock.Lock()
defer c.lock.Unlock()
hitLabel := "true"
entry, ok := c.data[fileID]
if !ok {
hitLabel = "false"
var purgeOnce sync.Once
entry = &cacheEntry{
value: lazy.NewWithError(func() (CacheEntryValue, error) {
val, err := fetch(db, fileID)
if err != nil {
return val, err
}
// Add the size of the file to the cache size metrics.
c.currentCacheSize.Add(float64(val.Size))
c.totalCacheSize.Add(float64(val.Size))
return val, err
}),
close: func() {
entry.refCount--
c.currentOpenFileReferences.Dec()
if entry.refCount > 0 {
return
}
entry.purge()
},
purge: func() {
purgeOnce.Do(func() {
c.purge(fileID)
})
},
}
c.data[fileID] = entry
c.currentOpenFiles.Inc()
c.totalOpenedFiles.Inc()
}
c.currentOpenFileReferences.Inc()
c.totalOpenFileReferences.WithLabelValues(hitLabel).Inc()
entry.refCount++
return entry
}
// purge immediately removes an entry from the cache, even if it has open
// references.
// Safety: Must only be called while the Cache lock is held
func (c *Cache) purge(fileID uuid.UUID) {
entry, ok := c.data[fileID]
if !ok {
// If we land here, it's probably because of a fetch attempt that
// resulted in an error, and got purged already. It may also be an
// erroneous extra close, but we can't really distinguish between those
// two cases currently.
return
}
// Purge the file from the cache.
c.currentOpenFiles.Dec()
ev, err := entry.value.Load()
if err == nil {
c.currentCacheSize.Add(-1 * float64(ev.Size))
}
delete(c.data, fileID)
}
// Count returns the number of files currently in the cache.
// Mainly used for unit testing assertions.
func (c *Cache) Count() int {
c.lock.Lock()
defer c.lock.Unlock()
return len(c.data)
}
func fetch(store database.Store, fileID uuid.UUID) (CacheEntryValue, error) {
// Because many callers can be waiting on the same file fetch concurrently, we
// want to prevent any failures that would cause them all to receive errors
// because the caller who initiated the fetch would fail.
// - We always run the fetch with an uncancelable context, and then check
// context cancellation for each acquirer afterwards.
// - We always run the fetch as a system user, and then check authorization
// for each acquirer afterwards.
// This prevents a canceled context or an unauthorized user from "holding up
// the queue".
//nolint:gocritic
file, err := store.GetFileByID(dbauthz.AsFileReader(context.Background()), fileID)
if err != nil {
return CacheEntryValue{}, xerrors.Errorf("failed to read file from database: %w", err)
}
var files fs.FS
switch file.Mimetype {
case "application/zip", "application/x-zip-compressed":
files, err = archivefs.FromZipReader(bytes.NewReader(file.Data), int64(len(file.Data)))
if err != nil {
return CacheEntryValue{}, xerrors.Errorf("failed to read zip file: %w", err)
}
default:
// Assume '"application/x-tar"' as the default mimetype.
files = archivefs.FromTarReader(bytes.NewBuffer(file.Data))
}
return CacheEntryValue{
Object: file.RBACObject(),
FS: files,
Size: int64(len(file.Data)),
}, nil
}