Skip to content

Commit 1f20cab

Browse files
authored
fix: don't use yamux for in-memory provisioner{,d} streams (#5136)
1 parent 2b6c229 commit 1f20cab

File tree

14 files changed

+105
-57
lines changed

14 files changed

+105
-57
lines changed

cli/server.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -890,7 +890,7 @@ func newProvisionerDaemon(
890890
return nil, xerrors.Errorf("mkdir %q: %w", cfg.CacheDirectory.Value, err)
891891
}
892892

893-
terraformClient, terraformServer := provisionersdk.TransportPipe()
893+
terraformClient, terraformServer := provisionersdk.MemTransportPipe()
894894
go func() {
895895
<-ctx.Done()
896896
_ = terraformClient.Close()
@@ -920,11 +920,11 @@ func newProvisionerDaemon(
920920
}
921921

922922
provisioners := provisionerd.Provisioners{
923-
string(database.ProvisionerTypeTerraform): sdkproto.NewDRPCProvisionerClient(provisionersdk.Conn(terraformClient)),
923+
string(database.ProvisionerTypeTerraform): sdkproto.NewDRPCProvisionerClient(terraformClient),
924924
}
925925
// include echo provisioner when in dev mode
926926
if dev {
927-
echoClient, echoServer := provisionersdk.TransportPipe()
927+
echoClient, echoServer := provisionersdk.MemTransportPipe()
928928
go func() {
929929
<-ctx.Done()
930930
_ = echoClient.Close()
@@ -941,7 +941,7 @@ func newProvisionerDaemon(
941941
}
942942
}
943943
}()
944-
provisioners[string(database.ProvisionerTypeEcho)] = sdkproto.NewDRPCProvisionerClient(provisionersdk.Conn(echoClient))
944+
provisioners[string(database.ProvisionerTypeEcho)] = sdkproto.NewDRPCProvisionerClient(echoClient)
945945
}
946946
return provisionerd.New(func(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) {
947947
// This debounces calls to listen every second. Read the comment

coderd/coderd.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ func compressHandler(h http.Handler) http.Handler {
644644
// CreateInMemoryProvisionerDaemon is an in-memory connection to a provisionerd. Useful when starting coderd and provisionerd
645645
// in the same process.
646646
func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce time.Duration) (client proto.DRPCProvisionerDaemonClient, err error) {
647-
clientSession, serverSession := provisionersdk.TransportPipe()
647+
clientSession, serverSession := provisionersdk.MemTransportPipe()
648648
defer func() {
649649
if err != nil {
650650
_ = clientSession.Close()
@@ -705,5 +705,5 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce ti
705705
_ = serverSession.Close()
706706
}()
707707

708-
return proto.NewDRPCProvisionerDaemonClient(provisionersdk.Conn(clientSession)), nil
708+
return proto.NewDRPCProvisionerDaemonClient(clientSession), nil
709709
}

coderd/coderdtest/coderdtest.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ func NewWithAPI(t *testing.T, options *Options) (*codersdk.Client, io.Closer, *c
315315
// well with coderd testing. It registers the "echo" provisioner for
316316
// quick testing.
317317
func NewProvisionerDaemon(t *testing.T, coderAPI *coderd.API) io.Closer {
318-
echoClient, echoServer := provisionersdk.TransportPipe()
318+
echoClient, echoServer := provisionersdk.MemTransportPipe()
319319
ctx, cancelFunc := context.WithCancel(context.Background())
320320
t.Cleanup(func() {
321321
_ = echoClient.Close()
@@ -339,7 +339,7 @@ func NewProvisionerDaemon(t *testing.T, coderAPI *coderd.API) io.Closer {
339339
UpdateInterval: 250 * time.Millisecond,
340340
ForceCancelInterval: time.Second,
341341
Provisioners: provisionerd.Provisioners{
342-
string(database.ProvisionerTypeEcho): sdkproto.NewDRPCProvisionerClient(provisionersdk.Conn(echoClient)),
342+
string(database.ProvisionerTypeEcho): sdkproto.NewDRPCProvisionerClient(echoClient),
343343
},
344344
WorkDirectory: t.TempDir(),
345345
})
@@ -350,7 +350,7 @@ func NewProvisionerDaemon(t *testing.T, coderAPI *coderd.API) io.Closer {
350350
}
351351

352352
func NewExternalProvisionerDaemon(t *testing.T, client *codersdk.Client, org uuid.UUID, tags map[string]string) io.Closer {
353-
echoClient, echoServer := provisionersdk.TransportPipe()
353+
echoClient, echoServer := provisionersdk.MemTransportPipe()
354354
ctx, cancelFunc := context.WithCancel(context.Background())
355355
t.Cleanup(func() {
356356
_ = echoClient.Close()
@@ -374,7 +374,7 @@ func NewExternalProvisionerDaemon(t *testing.T, client *codersdk.Client, org uui
374374
UpdateInterval: 250 * time.Millisecond,
375375
ForceCancelInterval: time.Second,
376376
Provisioners: provisionerd.Provisioners{
377-
string(database.ProvisionerTypeEcho): sdkproto.NewDRPCProvisionerClient(provisionersdk.Conn(echoClient)),
377+
string(database.ProvisionerTypeEcho): sdkproto.NewDRPCProvisionerClient(echoClient),
378378
},
379379
WorkDirectory: t.TempDir(),
380380
})

codersdk/provisionerdaemons.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,5 +212,5 @@ func (c *Client) ServeProvisionerDaemon(ctx context.Context, organization uuid.U
212212
if err != nil {
213213
return nil, xerrors.Errorf("multiplex client: %w", err)
214214
}
215-
return proto.NewDRPCProvisionerDaemonClient(provisionersdk.Conn(session)), nil
215+
return proto.NewDRPCProvisionerDaemonClient(provisionersdk.MultiplexedConn(session)), nil
216216
}

enterprise/cli/provisionerdaemons.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func provisionerDaemonStart() *cobra.Command {
6969
return xerrors.Errorf("mkdir %q: %w", cacheDir, err)
7070
}
7171

72-
terraformClient, terraformServer := provisionersdk.TransportPipe()
72+
terraformClient, terraformServer := provisionersdk.MemTransportPipe()
7373
go func() {
7474
<-ctx.Done()
7575
_ = terraformClient.Close()
@@ -104,7 +104,7 @@ func provisionerDaemonStart() *cobra.Command {
104104
logger.Info(ctx, "starting provisioner daemon", slog.F("tags", tags))
105105

106106
provisioners := provisionerd.Provisioners{
107-
string(database.ProvisionerTypeTerraform): proto.NewDRPCProvisionerClient(provisionersdk.Conn(terraformClient)),
107+
string(database.ProvisionerTypeTerraform): proto.NewDRPCProvisionerClient(terraformClient),
108108
}
109109
srv := provisionerd.New(func(ctx context.Context) (provisionerdproto.DRPCProvisionerDaemonClient, error) {
110110
return client.ServeProvisionerDaemon(ctx, org.ID, []codersdk.ProvisionerType{

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ replace github.com/gliderlabs/ssh => github.com/coder/ssh v0.0.0-20220811105153-
5353

5454
require (
5555
cdr.dev/slog v1.4.2-0.20220525200111-18dce5c2cd5f
56-
cloud.google.com/go/compute v1.12.1 // indirect
5756
cloud.google.com/go/compute/metadata v0.2.1
5857
github.com/AlecAivazis/survey/v2 v2.3.5
5958
github.com/adrg/xdg v0.4.0
@@ -129,6 +128,7 @@ require (
129128
github.com/tabbed/pqtype v0.1.1
130129
github.com/u-root/u-root v0.10.0
131130
github.com/unrolled/secure v1.13.0
131+
github.com/valyala/fasthttp v1.41.0
132132
go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1
133133
go.nhat.io/otelsql v0.7.0
134134
go.opentelemetry.io/otel v1.11.1
@@ -166,6 +166,7 @@ require (
166166
)
167167

168168
require (
169+
cloud.google.com/go/compute v1.12.1 // indirect
169170
filippo.io/edwards25519 v1.0.0-rc.1 // indirect
170171
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
171172
github.com/Microsoft/go-winio v0.5.2 // indirect

go.sum

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1776,8 +1776,11 @@ github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijb
17761776
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
17771777
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
17781778
github.com/uudashr/gocognit v1.0.5/go.mod h1:wgYz0mitoKOTysqxTDMOUXg+Jb5SvtihkfmugIZYpEA=
1779+
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
17791780
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
17801781
github.com/valyala/fasthttp v1.30.0/go.mod h1:2rsYD01CKFrjjsvFxx75KlEUNpWNBY9JWD3K/7o2Cus=
1782+
github.com/valyala/fasthttp v1.41.0 h1:zeR0Z1my1wDHTRiamBCXVglQdbUwgb9uWG3k1HQz6jY=
1783+
github.com/valyala/fasthttp v1.41.0/go.mod h1:f6VbjjoI3z1NDOZOv17o6RvtRSWxC77seBFc2uWtgiY=
17811784
github.com/valyala/quicktemplate v1.7.0/go.mod h1:sqKJnoaOF88V07vkO+9FL8fb9uZg/VPSJnLYn+LmLk8=
17821785
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
17831786
github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8/go.mod h1:dniwbG03GafCjFohMDmz6Zc6oCuiqgH6tGNyXTkHzXE=
@@ -1975,6 +1978,7 @@ golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5y
19751978
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
19761979
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
19771980
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
1981+
golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
19781982
golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU=
19791983
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
19801984
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -2117,6 +2121,7 @@ golang.org/x/net v0.0.0-20220531201128-c960675eff93/go.mod h1:XRhObCWvk6IyKnWLug
21172121
golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
21182122
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
21192123
golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
2124+
golang.org/x/net v0.0.0-20220906165146-f3363e06e74c/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
21202125
golang.org/x/net v0.1.0 h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0=
21212126
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
21222127
golang.org/x/oauth2 v0.0.0-20180227000427-d7d64896b5ff/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=

provisioner/echo/serve_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func TestEcho(t *testing.T) {
2323

2424
fs := afero.NewMemMapFs()
2525
// Create an in-memory provisioner to communicate with.
26-
client, server := provisionersdk.TransportPipe()
26+
client, server := provisionersdk.MemTransportPipe()
2727
ctx, cancelFunc := context.WithCancel(context.Background())
2828
t.Cleanup(func() {
2929
_ = client.Close()
@@ -36,7 +36,7 @@ func TestEcho(t *testing.T) {
3636
})
3737
assert.NoError(t, err)
3838
}()
39-
api := proto.NewDRPCProvisionerClient(provisionersdk.Conn(client))
39+
api := proto.NewDRPCProvisionerClient(client)
4040

4141
t.Run("Parse", func(t *testing.T) {
4242
t.Parallel()

provisioner/terraform/provision_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func setupProvisioner(t *testing.T, opts *provisionerServeOptions) (context.Cont
3636
opts = &provisionerServeOptions{}
3737
}
3838
cachePath := t.TempDir()
39-
client, server := provisionersdk.TransportPipe()
39+
client, server := provisionersdk.MemTransportPipe()
4040
ctx, cancelFunc := context.WithCancel(context.Background())
4141
serverErr := make(chan error, 1)
4242
t.Cleanup(func() {
@@ -59,7 +59,7 @@ func setupProvisioner(t *testing.T, opts *provisionerServeOptions) (context.Cont
5959
ExitTimeout: opts.exitTimeout,
6060
})
6161
}()
62-
api := proto.NewDRPCProvisionerClient(provisionersdk.Conn(client))
62+
api := proto.NewDRPCProvisionerClient(client)
6363
return ctx, api
6464
}
6565

provisionerd/provisionerd.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/prometheus/client_golang/prometheus"
1515
"github.com/prometheus/client_golang/prometheus/promauto"
1616
"github.com/spf13/afero"
17+
"github.com/valyala/fasthttp/fasthttputil"
1718
"go.opentelemetry.io/otel/attribute"
1819
semconv "go.opentelemetry.io/otel/semconv/v1.11.0"
1920
"go.opentelemetry.io/otel/trace"
@@ -344,7 +345,7 @@ func (p *Server) acquireJob(ctx context.Context) {
344345
}
345346

346347
func retryable(err error) bool {
347-
return xerrors.Is(err, yamux.ErrSessionShutdown) || xerrors.Is(err, io.EOF) ||
348+
return xerrors.Is(err, yamux.ErrSessionShutdown) || xerrors.Is(err, io.EOF) || xerrors.Is(err, fasthttputil.ErrInmemoryListenerClosed) ||
348349
// annoyingly, dRPC sometimes returns context.Canceled if the transport was closed, even if the context for
349350
// the RPC *is not canceled*. Retrying is fine if the RPC context is not canceled.
350351
xerrors.Is(err, context.Canceled)

0 commit comments

Comments
 (0)