@@ -9,24 +9,29 @@ import (
9
9
"golang.org/x/xerrors"
10
10
)
11
11
12
- var channelID uuid.UUID
12
+ // LatencyMeasurer is used to measure the send & receive latencies of the underlying Pubsub implementation. We use these
13
+ // measurements to export metrics which can indicate when a Pubsub implementation's queue is overloaded and/or full.
14
+ type LatencyMeasurer struct {
15
+ // Create unique pubsub channel names so that multiple replicas do not clash when performing latency measurements,
16
+ // and only create one UUID per Pubsub impl (and not request) to limit the number of notification channels that need
17
+ // to be maintained by the Pubsub impl.
18
+ channelIDs map [Pubsub ]uuid.UUID
19
+ }
13
20
14
- // Create a new pubsub channel UUID per coderd instance so that multiple replicas do not clash when performing latency
15
- // measurements, and only create one UUID per instance (and not request) to limit the number of notification channels
16
- // that need to be maintained by the Pubsub implementation.
17
- func init () {
18
- channelID = uuid .New ()
21
+ func NewLatencyMeasurer () * LatencyMeasurer {
22
+ return & LatencyMeasurer {
23
+ channelIDs : make (map [Pubsub ]uuid.UUID ),
24
+ }
19
25
}
20
26
21
- // MeasureLatency takes a given Pubsub implementation, publishes a message & immediately receives it, and returns the
22
- // observed latency.
23
- func MeasureLatency (ctx context.Context , p Pubsub ) (send float64 , recv float64 , err error ) {
27
+ // Measure takes a given Pubsub implementation, publishes a message & immediately receives it, and returns the observed latency.
28
+ func (lm * LatencyMeasurer ) Measure (ctx context.Context , p Pubsub ) (send float64 , recv float64 , err error ) {
24
29
var (
25
30
start time.Time
26
31
res = make (chan float64 , 1 )
27
32
)
28
33
29
- cancel , err := p .Subscribe (latencyChannelName (), func (ctx context.Context , _ []byte ) {
34
+ cancel , err := p .Subscribe (lm . latencyChannelName (p ), func (ctx context.Context , _ []byte ) {
30
35
res <- time .Since (start ).Seconds ()
31
36
})
32
37
if err != nil {
@@ -35,7 +40,7 @@ func MeasureLatency(ctx context.Context, p Pubsub) (send float64, recv float64,
35
40
defer cancel ()
36
41
37
42
start = time .Now ()
38
- err = p .Publish (latencyChannelName (), []byte {})
43
+ err = p .Publish (lm . latencyChannelName (p ), []byte {})
39
44
if err != nil {
40
45
return - 1 , - 1 , xerrors .Errorf ("failed to publish: %w" , err )
41
46
}
@@ -50,6 +55,12 @@ func MeasureLatency(ctx context.Context, p Pubsub) (send float64, recv float64,
50
55
}
51
56
}
52
57
53
- func latencyChannelName () string {
54
- return fmt .Sprintf ("latency-measure:%s" , channelID .String ())
58
+ func (lm * LatencyMeasurer ) latencyChannelName (p Pubsub ) string {
59
+ cid , found := lm .channelIDs [p ]
60
+ if ! found {
61
+ cid = uuid .New ()
62
+ lm .channelIDs [p ] = cid
63
+ }
64
+
65
+ return fmt .Sprintf ("latency-measure:%s" , cid .String ())
55
66
}
0 commit comments