|
1 | 1 | package files
|
2 | 2 |
|
3 | 3 | import (
|
4 |
| - "archive/tar" |
5 | 4 | "bytes"
|
6 | 5 | "context"
|
7 |
| - "fmt" |
8 |
| - "io" |
9 | 6 | "io/fs"
|
10 | 7 | "sync"
|
11 |
| - "time" |
| 8 | + "sync/atomic" |
12 | 9 |
|
| 10 | + archivefs "github.com/coder/coder/v2/archive/fs" |
13 | 11 | "github.com/coder/coder/v2/coderd/database"
|
14 | 12 | "github.com/coder/coder/v2/coderd/util/lazy"
|
15 | 13 | "github.com/google/uuid"
|
16 | 14 | "golang.org/x/xerrors"
|
17 | 15 | )
|
18 | 16 |
|
| 17 | +// NewFromStore returns a file cache that will fetch files from the provided |
| 18 | +// database. |
| 19 | +func NewFromStore(store database.Store) Cache { |
| 20 | + fetcher := func(ctx context.Context, fileID uuid.UUID) (fs.FS, error) { |
| 21 | + file, err := store.GetFileByID(ctx, fileID) |
| 22 | + if err != nil { |
| 23 | + return nil, xerrors.Errorf("failed to read file from database: %w", err) |
| 24 | + } |
| 25 | + |
| 26 | + content := bytes.NewBuffer(file.Data) |
| 27 | + return archivefs.FromTar(content), nil |
| 28 | + } |
| 29 | + |
| 30 | + return Cache{ |
| 31 | + fetcher: fetcher, |
| 32 | + } |
| 33 | +} |
| 34 | + |
19 | 35 | // Cache persists the files for template versions, and is used by dynamic
|
20 |
| -// parameters to deduplicate the files in memory. |
21 |
| -// - The user connects to the dynamic parameters websocket with a given template |
22 |
| -// version id. |
23 |
| -// - template version -> provisioner job -> file |
24 |
| -// - We persist those files |
25 |
| -// |
26 |
| -// Requirements: |
27 |
| -// - Multiple template versions can share a single "file" |
28 |
| -// - Files should be "ref counted" so that they're released when no one is using |
29 |
| -// them |
30 |
| -// - You should be able to fetch multiple different files in parallel, but you |
31 |
| -// should not fetch the same file multiple times in parallel. |
| 36 | +// parameters to deduplicate the files in memory. When any number of users opens |
| 37 | +// the workspace creation form for a given template version, it's files are |
| 38 | +// loaded into memory exactly once. We hold those files until there are no |
| 39 | +// longer any open connections, and then we remove the value from the map. |
32 | 40 | type Cache struct {
|
33 |
| - sync.Mutex |
34 |
| - data map[uuid.UUID]*lazy.Value[fs.FS] |
| 41 | + lock sync.Mutex |
| 42 | + data map[uuid.UUID]*cacheEntry |
| 43 | + |
| 44 | + fetcher func(context.Context, uuid.UUID) (fs.FS, error) |
35 | 45 | }
|
36 | 46 |
|
37 |
| -// type CacheEntry struct { |
38 |
| -// atomic. |
39 |
| -// } |
| 47 | +type cacheEntry struct { |
| 48 | + refCount *atomic.Int64 |
| 49 | + value *lazy.ValueWithError[fs.FS] |
| 50 | +} |
40 | 51 |
|
41 |
| -// Acquire |
42 |
| -func (c *Cache) Acquire(fileID uuid.UUID) fs.FS { |
43 |
| - return c.fetch(fileID).Load() |
| 52 | +// Acquire will load the fs.FS for the given file. It guarantees that parallel |
| 53 | +// calls for the same fileID will only result in one fetch, and that parallel |
| 54 | +// calls for distinct fileIDs will fetch in parallel. |
| 55 | +func (c *Cache) Acquire(ctx context.Context, fileID uuid.UUID) (fs.FS, error) { |
| 56 | + // It's important that this `Load` call occurs outside of `prepare`, after the |
| 57 | + // mutex has been released, or we would continue to hold the lock until the |
| 58 | + // entire file has been fetched, which may be slow, and would prevent other |
| 59 | + // files from being fetched in parallel. |
| 60 | + return c.prepare(ctx, fileID).Load() |
44 | 61 | }
|
45 | 62 |
|
46 |
| -// fetch handles grabbing the lock, creating a new lazy.Value if necessary, |
47 |
| -// and returning it. The lock can be safely released because lazy.Value handles |
48 |
| -// its own synchronization, so multiple concurrent reads for the same fileID |
49 |
| -// will still only ever result in a single load being performed. |
50 |
| -func (c *Cache) fetch(fileID uuid.UUID) *lazy.Value[fs.FS] { |
51 |
| - c.Mutex.Lock() |
52 |
| - defer c.Mutex.Unlock() |
| 63 | +func (c *Cache) prepare(ctx context.Context, fileID uuid.UUID) *lazy.ValueWithError[fs.FS] { |
| 64 | + c.lock.Lock() |
| 65 | + defer c.lock.Unlock() |
53 | 66 |
|
54 |
| - entry := c.data[fileID] |
55 |
| - if entry == nil { |
56 |
| - entry = lazy.New(func() fs.FS { |
57 |
| - time.Sleep(5 * time.Second) |
58 |
| - return NilFS{} |
| 67 | + entry, ok := c.data[fileID] |
| 68 | + if !ok { |
| 69 | + var refCount atomic.Int64 |
| 70 | + value := lazy.NewWithError(func() (fs.FS, error) { |
| 71 | + return c.fetcher(ctx, fileID) |
59 | 72 | })
|
60 |
| - c.data[fileID] = entry |
61 |
| - } |
62 |
| - |
63 |
| - return entry |
64 |
| -} |
65 | 73 |
|
66 |
| -func NewFromStore(store database.Store) Cache { |
67 |
| - _ = func(ctx context.Context, fileID uuid.UUID) (fs.FS, error) { |
68 |
| - file, err := store.GetFileByID(ctx, fileID) |
69 |
| - if err != nil { |
70 |
| - return nil, xerrors.Errorf("failed to read file from database: %w", err) |
| 74 | + entry = &cacheEntry{ |
| 75 | + value: value, |
| 76 | + refCount: &refCount, |
71 | 77 | }
|
72 |
| - |
73 |
| - reader := tar.NewReader(bytes.NewBuffer(file.Data)) |
74 |
| - _, _ = io.ReadAll(reader) |
75 |
| - |
76 |
| - return NilFS{}, nil |
| 78 | + c.data[fileID] = entry |
77 | 79 | }
|
78 | 80 |
|
79 |
| - return Cache{} |
| 81 | + entry.refCount.Add(1) |
| 82 | + return entry.value |
80 | 83 | }
|
81 | 84 |
|
82 |
| -type NilFS struct{} |
| 85 | +// Release decrements the reference count for the given fileID, and frees the |
| 86 | +// backing data if there are no further references being held. |
| 87 | +func (c *Cache) Release(fileID uuid.UUID) { |
| 88 | + c.lock.Lock() |
| 89 | + defer c.lock.Unlock() |
83 | 90 |
|
84 |
| -var _ fs.FS = NilFS{} |
85 |
| - |
86 |
| -func (t NilFS) Open(_ string) (fs.File, error) { |
87 |
| - return nil, fmt.Errorf("oh no") |
| 91 | + entry, ok := c.data[fileID] |
| 92 | + if !ok { |
| 93 | + // If we land here, it's almost certainly because a bug already happened, |
| 94 | + // and we're freeing something that's already been freed, or we're calling |
| 95 | + // this function with an incorrect ID. Should this function return an error? |
| 96 | + return |
| 97 | + } |
| 98 | + refCount := entry.refCount.Add(-1) |
| 99 | + if refCount < 1 { |
| 100 | + delete(c.data, fileID) |
| 101 | + } |
88 | 102 | }
|
0 commit comments