8000 chore: add files cache for reading template tar archives from db (#17… · coder/coder@ac7ea08 · GitHub
[go: up one dir, main page]

Skip to content

Commit ac7ea08

Browse files
authored
chore: add files cache for reading template tar archives from db (#17141)
1 parent c062942 commit ac7ea08

File tree

5 files changed

+308
-0
lines changed

5 files changed

+308
-0
lines changed

archive/fs/tar.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package archivefs
2+
3+
import (
4+
"archive/tar"
5+
"io"
6+
"io/fs"
7+
8+
"github.com/spf13/afero"
9+
"github.com/spf13/afero/tarfs"
10+
)
11+
12+
func FromTarReader(r io.Reader) fs.FS {
13+
tr := tar.NewReader(r)
14+
tfs := tarfs.New(tr)
15+
rofs := afero.NewReadOnlyFs(tfs)
16+
return afero.NewIOFS(rofs)
17+
}

coderd/files/cache.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package files
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"io/fs"
7+
"sync"
8+
9+
"github.com/google/uuid"
10+
"golang.org/x/xerrors"
11+
12+
archivefs "github.com/coder/coder/v2/archive/fs"
13+
"github.com/coder/coder/v2/coderd/database"
14+
"github.com/coder/coder/v2/coderd/util/lazy"
15+
)
16+
17+
// NewFromStore returns a file cache that will fetch files from the provided
18+
// database.
19+
func NewFromStore(store database.Store) Cache {
20+
fetcher := func(ctx context.Context, fileID uuid.UUID) (fs.FS, error) {
21+
file, err := store.GetFileByID(ctx, fileID)
22+
if err != nil {
23+
return nil, xerrors.Errorf("failed to read file from database: %w", err)
24+
}
25+
26+
content := bytes.NewBuffer(file.Data)
27+
return archivefs.FromTarReader(content), nil
28+
}
29+
30+
return Cache{
31+
lock: sync.Mutex{},
32+
data: make(map[uuid.UUID]*cacheEntry),
33+
fetcher: fetcher,
34+
}
35+
}
36+
37+
// Cache persists the files for template versions, and is used by dynamic
38+
// parameters to deduplicate the files in memory. When any number of users opens
39+
// the workspace creation form for a given template version, it's files are
40+
// loaded into memory exactly once. We hold those files until there are no
41+
// longer any open connections, and then we remove the value from the map.
42+
type Cache struct {
43+
lock sync.Mutex
44+
data map[uuid.UUID]*cacheEntry
45+
fetcher
46+
}
47+
48+
type cacheEntry struct {
49+
// refCount must only be accessed while the Cache lock is held.
50+
refCount int
51+
value *lazy.ValueWithError[fs.FS]
52+
}
53+
54+
type fetcher func(context.Context, uuid.UUID) (fs.FS, error)
55+
56+
// Acquire will load the fs.FS for the given file. It guarantees that parallel
57+
// calls for the same fileID will only result in one fetch, and that parallel
58+
// calls for distinct fileIDs will fetch in parallel.
59+
//
60+
// Every call to Acquire must have a matching call to Release.
61+
func (c *Cache) Acquire(ctx context.Context, fileID uuid.UUID) (fs.FS, error) {
62+
// It's important that this `Load` call occurs outside of `prepare`, after the
63+
// mutex has been released, or we would co A3E2 ntinue to hold the lock until the
64+
// entire file has been fetched, which may be slow, and would prevent other
65+
// files from being fetched in parallel.
66+
return c.prepare(ctx, fileID).Load()
67+
}
68+
69+
func (c *Cache) prepare(ctx context.Context, fileID uuid.UUID) *lazy.ValueWithError[fs.FS] {
70+
c.lock.Lock()
71+
defer c.lock.Unlock()
72+
73+
entry, ok := c.data[fileID]
74+
if !ok {
75+
value := lazy.NewWithError(func() (fs.FS, error) {
76+
return c.fetcher(ctx, fileID)
77+
})
78+
79+
entry = &cacheEntry{
80+
value: value,
81+
refCount: 0,
82+
}
83+
c.data[fileID] = entry
84+
}
85+
86+
entry.refCount++
87+
return entry.value
88+
}
89+
90+
// Release decrements the reference count for the given fileID, and frees the
91+
// backing data if there are no further references being held.
92+
func (c *Cache) Release(fileID uuid.UUID) {
93+
c.lock.Lock()
94+
defer c.lock.Unlock()
95+
96+
entry, ok := c.data[fileID]
97+
if !ok {
98+
// If we land here, it's almost certainly because a bug already happened,
99+
// and we're freeing something that's already been freed, or we're calling
100+
// this function with an incorrect ID. Should this function return an error?
101+
return
102+
}
103+
104+
entry.refCount--
105+
if entry.refCount > 0 {
106+
return
107+
}
108+
109+
delete(c.data, fileID)
110+
}

coderd/files/cache_internal_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package files
2+
3+
import (
4+
"context"
5+
"io/fs"
6+
"sync"
7+
"sync/atomic"
8+
"testing"
9+
"time"
10+
11+
"github.com/google/uuid"
12+
"github.com/spf13/afero"
13+
"github.com/stretchr/testify/require"
14+
"golang.org/x/sync/errgroup"
15+
16+
"github.com/coder/coder/v2/testutil"
17+
)
18+
19+
func TestConcurrency(t *testing.T) {
20+
t.Parallel()
21+
22+
emptyFS := afero.NewIOFS(afero.NewReadOnlyFs(afero.NewMemMapFs()))
23+
var fetches atomic.Int64
24+
c := newTestCache(func(_ context.Context, _ uuid.UUID) (fs.FS, error) {
25+
fetches.Add(1)
26+
// Wait long enough before returning to make sure that all of the goroutines
27+
// will be waiting in line, ensuring that no one duplicated a fetch.
28+
time.Sleep(testutil.IntervalMedium)
29+
return emptyFS, nil
30+
})
31+
32+
batches := 1000
33+
groups := make([]*errgroup.Group, 0, batches)
34+
for range batches {
35+
groups = append(groups, new(errgroup.Group))
36+
}
37+
38+
// Call Acquire with a unique ID per batch, many times per batch, with many
39+
// batches all in parallel. This is pretty much the worst-case scenario:
40+
// thousands of concurrent reads, with both warm and cold loads happening.
41+
batchSize := 10
42+
for _, g := range groups {
43+
id := uuid.New()
44+
for range batchSize {
45+
g.Go(func() error {
46+
// We don't bother to Release these references because the Cache will be
47+
// released at the end of the test anyway.
48+
_, err := c.Acquire(t.Context(), id)
49+
return err
50+
})
51+
}
52+
}
53+
54+
for _, g := range groups {
55+
require.NoError(t, g.Wait())
56+
}
57+
require.Equal(t, int64(batches), fetches.Load())
58+
}
59+
60+
func TestRelease(t *testing.T) {
61+
t.Parallel()
62+
63+
emptyFS := afero.NewIOFS(afero.NewReadOnlyFs(afero.NewMemMapFs()))
64+
c := newTestCache(func(_ context.Context, _ uuid.UUID) (fs.FS, error) {
65+
return emptyFS, nil
66+
})
67+
68+
batches := 100
69+
ids := make([]uuid.UUID, 0, batches)
70+
for range batches {
71+
ids = append(ids, uuid.New())
72+
}
73+
74+
// Acquire a bunch of references
75+
batchSize := 10
76+
for _, id := range ids {
77+
for range batchSize {
78+
it, err := c.Acquire(t.Context(), id)
79+
require.NoError(t, err)
80+
require.Equal(t, emptyFS, it)
81+
}
82+
}
83+
84+
// Make sure cache is fully loaded
85+
require.Equal(t, len(c.data), batches)
86+
87+
// Now release all of the references
88+
for _, id := range ids {
89+
for range batchSize {
90+
c.Release(id)
91+
}
92+
}
93+
94+
// ...and make sure that the cache has emptied itself.
95+
require.Equal(t, len(c.data), 0)
96+
}
97+
98+
func newTestCache(fetcher func(context.Context, uuid.UUID) (fs.FS, error)) Cache {
99+
return Cache{
100+
lock: sync.Mutex{},
101+
data: make(map[uuid.UUID]*cacheEntry),
102+
fetcher: fetcher,
103+
}
104+
}

coderd/util/lazy/valuewitherror.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package lazy
2+
3+
type ValueWithError[T any] struct {
4+
inner Value[result[T]]
5+
}
6+
7+
type result[T any] struct {
8+
value T
9+
err error
10+
}
11+
12+
// NewWithError allows you to provide a lazy initializer that can fail.
13+
func NewWithError[T any](fn func() (T, error)) *ValueWithError[T] {
14+
return &ValueWithError[T]{
15+
inner: Value[result[T]]{fn: func() result[T] {
16+
value, err := fn()
17+
return result[T]{value: value, err: err}
18+
}},
19+
}
20+
}
21+
22+
func (v *ValueWithError[T]) Load() (T, error) {
23+
result := v.inner.Load()
24+
return result.value, result.err
25+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package lazy_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
"golang.org/x/xerrors"
8+
9+
"github.com/coder/coder/v2/coderd/util/lazy"
10+
)
11+
12+
func TestLazyWithErrorOK(t *testing.T) {
13+
t.Parallel()
14+
15+
l := lazy.NewWithError(func() (int, error) {
16+
return 1, nil
17+
})
18+
19+
i, err := l.Load()
20+
require.NoError(t, err)
21+
require.Equal(t, 1, i)
22+
}
23+
24+
func TestLazyWithErrorErr(t *testing.T) {
25+
t.Parallel()
26+
27+
l := lazy.NewWithError(func() (int, error) {
28+
return 0, xerrors.New("oh no! everything that could went horribly wrong!")
29+
})
30+
31+
i, err := l.Load()
32+
require.Error(t, err)
33+
require.Equal(t, 0, i)
34+
}
35+
36+
func TestLazyWithErrorPointers(t *testing.T) {
37+
t.Parallel()
38+
39+
a := 1
40+
l := lazy.NewWithError(func() (*int, error) {
41+
return &a, nil
42+
})
43+
44+
b, err := l.Load()
45+
require.NoError(t, err)
46+
c, err := l.Load()
47+
require.NoError(t, err)
48+
49+
*b++
50+
*c++
51+
require.Equal(t, 3, a)
52+
}

0 commit comments

Comments
 (0)
0