Skip to content

Commit 3ed6a92

Browse files
committed
wow!
1 parent 3c6b6ed commit 3ed6a92

File tree

4 files changed

+102
-135
lines changed

4 files changed

+102
-135
lines changed

archive/fs/fs.go

Lines changed: 0 additions & 56 deletions
This file was deleted.

archive/fs/tar.go

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,29 +4,14 @@ import (
44
"archive/tar"
55
"io"
66
"io/fs"
7-
"path"
87

9-
"golang.org/x/xerrors"
8+
"github.com/spf13/afero"
9+
"github.com/spf13/afero/tarfs"
1010
)
1111

12-
func FromTar(r tar.Reader) (fs.FS, error) {
13-
fs := FS{files: make(map[string]fs.File)}
14-
for {
15-
it, err := r.Next()
16-
17-
if err != nil {
18-
return nil, xerrors.Errorf("failed to read tar archive: %w", err)
19-
}
20-
21-
// bufio.NewReader(&r).
22-
content, err := io.ReadAll(&r)
23-
fs.files[it.Name] = &File{
24-
info: it.FileInfo(),
25-
content: content,
26-
}
27-
}
28-
29-
path.Split(path string)
30-
31-
return fs, nil
12+
func FromTar(r io.Reader) fs.FS {
13+
tr := tar.NewReader(r)
14+
tfs := tarfs.New(tr)
15+
rofs := afero.NewReadOnlyFs(tfs)
16+
return afero.NewIOFS(rofs)
3217
}

coderd/files/cache.go

Lines changed: 71 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,88 +1,102 @@
11
package files
22

33
import (
4-
"archive/tar"
54
"bytes"
65
"context"
7-
"fmt"
8-
"io"
96
"io/fs"
107
"sync"
11-
"time"
8+
"sync/atomic"
129

10+
archivefs "github.com/coder/coder/v2/archive/fs"
1311
"github.com/coder/coder/v2/coderd/database"
1412
"github.com/coder/coder/v2/coderd/util/lazy"
1513
"github.com/google/uuid"
1614
"golang.org/x/xerrors"
1715
)
1816

17+
// NewFromStore returns a file cache that will fetch files from the provided
18+
// database.
19+
func NewFromStore(store database.Store) Cache {
20+
fetcher := func(ctx context.Context, fileID uuid.UUID) (fs.FS, error) {
21+
file, err := store.GetFileByID(ctx, fileID)
22+
if err != nil {
23+
return nil, xerrors.Errorf("failed to read file from database: %w", err)
24+
}
25+
26+
content := bytes.NewBuffer(file.Data)
27+
return archivefs.FromTar(content), nil
28+
}
29+
30+
return Cache{
31+
fetcher: fetcher,
32+
}
33+
}
34+
1935
// Cache persists the files for template versions, and is used by dynamic
20-
// parameters to deduplicate the files in memory.
21-
// - The user connects to the dynamic parameters websocket with a given template
22-
// version id.
23-
// - template version -> provisioner job -> file
24-
// - We persist those files
25-
//
26-
// Requirements:
27-
// - Multiple template versions can share a single "file"
28-
// - Files should be "ref counted" so that they're released when no one is using
29-
// them
30-
// - You should be able to fetch multiple different files in parallel, but you
31-
// should not fetch the same file multiple times in parallel.
36+
// parameters to deduplicate the files in memory. When any number of users opens
37+
// the workspace creation form for a given template version, it's files are
38+
// loaded into memory exactly once. We hold those files until there are no
39+
// longer any open connections, and then we remove the value from the map.
3240
type Cache struct {
33-
sync.Mutex
34-
data map[uuid.UUID]*lazy.Value[fs.FS]
41+
lock sync.Mutex
42+
data map[uuid.UUID]*cacheEntry
43+
44+
fetcher func(context.Context, uuid.UUID) (fs.FS, error)
3545
}
3646

37-
// type CacheEntry struct {
38-
// atomic.
39-
// }
47+
type cacheEntry struct {
48+
refCount *atomic.Int64
49+
value *lazy.ValueWithError[fs.FS]
50+
}
4051

41-
// Acquire
42-
func (c *Cache) Acquire(fileID uuid.UUID) fs.FS {
43-
return c.fetch(fileID).Load()
52+
// Acquire will load the fs.FS for the given file. It guarantees that parallel
53+
// calls for the same fileID will only result in one fetch, and that parallel
54+
// calls for distinct fileIDs will fetch in parallel.
55+
func (c *Cache) Acquire(ctx context.Context, fileID uuid.UUID) (fs.FS, error) {
56+
// It's important that this `Load` call occurs outside of `prepare`, after the
57+
// mutex has been released, or we would continue to hold the lock until the
58+
// entire file has been fetched, which may be slow, and would prevent other
59+
// files from being fetched in parallel.
60+
return c.prepare(ctx, fileID).Load()
4461
}
4562

46-
// fetch handles grabbing the lock, creating a new lazy.Value if necessary,
47-
// and returning it. The lock can be safely released because lazy.Value handles
48-
// its own synchronization, so multiple concurrent reads for the same fileID
49-
// will still only ever result in a single load being performed.
50-
func (c *Cache) fetch(fileID uuid.UUID) *lazy.Value[fs.FS] {
51-
c.Mutex.Lock()
52-
defer c.Mutex.Unlock()
63+
func (c *Cache) prepare(ctx context.Context, fileID uuid.UUID) *lazy.ValueWithError[fs.FS] {
64+
c.lock.Lock()
65+
defer c.lock.Unlock()
5366

54-
entry := c.data[fileID]
55-
if entry == nil {
56-
entry = lazy.New(func() fs.FS {
57-
time.Sleep(5 * time.Second)
58-
return NilFS{}
67+
entry, ok := c.data[fileID]
68+
if !ok {
69+
var refCount atomic.Int64
70+
value := lazy.NewWithError(func() (fs.FS, error) {
71+
return c.fetcher(ctx, fileID)
5972
})
60-
c.data[fileID] = entry
61-
}
62-
63-
return entry
64-
}
6573

66-
func NewFromStore(store database.Store) Cache {
67-
_ = func(ctx context.Context, fileID uuid.UUID) (fs.FS, error) {
68-
file, err := store.GetFileByID(ctx, fileID)
69-
if err != nil {
70-
return nil, xerrors.Errorf("failed to read file from database: %w", err)
74+
entry = &cacheEntry{
75+
value: value,
76+
refCount: &refCount,
7177
}
72-
73-
reader := tar.NewReader(bytes.NewBuffer(file.Data))
74-
_, _ = io.ReadAll(reader)
75-
76-
return NilFS{}, nil
78+
c.data[fileID] = entry
7779
}
7880

79-
return Cache{}
81+
entry.refCount.Add(1)
82+
return entry.value
8083
}
8184

82-
type NilFS struct{}
85+
// Release decrements the reference count for the given fileID, and frees the
86+
// backing data if there are no further references being held.
87+
func (c *Cache) Release(fileID uuid.UUID) {
88+
c.lock.Lock()
89+
defer c.lock.Unlock()
8390

84-
var _ fs.FS = NilFS{}
85-
86-
func (t NilFS) Open(_ string) (fs.File, error) {
87-
return nil, fmt.Errorf("oh no")
91+
entry, ok := c.data[fileID]
92+
if !ok {
93+
// If we land here, it's almost certainly because a bug already happened,
94+
// and we're freeing something that's already been freed, or we're calling
95+
// this function with an incorrect ID. Should this function return an error?
96+
return
97+
}
98+
refCount := entry.refCount.Add(-1)
99+
if refCount < 1 {
100+
delete(c.data, fileID)
101+
}
88102
}

coderd/util/lazy/value.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,27 @@ func (v *Value[T]) Load() T {
2626
func New[T any](fn func() T) *Value[T] {
2727
return &Value[T]{fn: fn}
2828
}
29+
30+
type ValueWithError[T any] struct {
31+
inner Value[result[T]]
32+
}
33+
34+
type result[T any] struct {
35+
value T
36+
err error
37+
}
38+
39+
// NewWithError allows you to provide a lazy initializer that can fail.
40+
func NewWithError[T any](fn func() (T, error)) *ValueWithError[T] {
41+
return &ValueWithError[T]{
42+
inner: Value[result[T]]{fn: func() result[T] {
43+
value, err := fn()
44+
return result[T]{value: value, err: err}
45+
}},
46+
}
47+
}
48+
49+
func (v *ValueWithError[T]) Load() (T, error) {
50+
result := v.inner.Load()
51+
return result.value, result.err
52+
}

0 commit comments

Comments
 (0)