@@ -20,8 +20,11 @@ type LatencyMeasurer struct {
20
20
channel uuid.UUID
21
21
logger slog.Logger
22
22
23
+ // background measurement members
23
24
collections atomic.Int64
24
25
last atomic.Value
26
+ asyncTick * time.Ticker
27
+ stop chan struct {}
25
28
}
26
29
27
30
type LatencyMeasurement struct {
@@ -36,6 +39,7 @@ func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer {
36
39
return & LatencyMeasurer {
37
40
channel : uuid .New (),
38
41
logger : logger ,
42
+ stop : make (chan struct {}, 1 ),
39
43
}
40
44
}
41
45
@@ -47,6 +51,7 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasure
47
51
)
48
52
49
53
msg := []byte (uuid .New ().String ())
54
+ lm .logger .Debug (ctx , "performing measurement" , slog .F ("msg" , msg ))
50
55
51
56
cancel , err := p .Subscribe (lm .latencyChannelName (), func (ctx context.Context , in []byte ) {
52
57
if ! bytes .Equal (in , msg ) {
@@ -81,23 +86,31 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasure
81
86
// MeasureAsync runs latency measurements asynchronously on a given interval.
82
87
// This function is expected to be run in a goroutine and will exit when the context is canceled.
83
88
func (lm * LatencyMeasurer ) MeasureAsync (ctx context.Context , p Pubsub , interval time.Duration ) {
84
- tick : = time .NewTicker (interval )
85
- defer tick .Stop ()
89
+ lm . asyncTick = time .NewTicker (interval )
90
+ defer lm . asyncTick .Stop ()
86
91
87
- for ; true ; <- tick .C { // tick immediately
88
- select {
89
- case <- ctx .Done ():
92
+ for {
93
+ // run immediately on first call, then sleep a tick before each invocation
94
+ if p == nil {
95
+ lm .logger .Error (ctx , "given pubsub is nil" )
90
96
return
91
- default :
92
- if p == nil {
93
- lm .logger .Error (ctx , "given pubsub is nil" )
94
- return
95
- }
96
97
}
97
98
98
99
lm .collections .Add (1 )
99
100
measure := lm .Measure (ctx , p )
100
101
lm .last .Store (& measure )
102
+
103
+ select {
104
+ case <- lm .asyncTick .C :
105
+ continue
106
+
107
+ // bail out if signaled
108
+ case <- lm .stop :
109
+ return
110
+ case <- ctx .Done ():
111
+ lm .logger .Debug (ctx , "async measurement context canceled" , slog .Error (ctx .Err ()))
112
+ return
113
+ }
101
114
}
102
115
}
103
116
@@ -115,6 +128,15 @@ func (lm *LatencyMeasurer) MeasurementCount() int64 {
115
128
return lm .collections .Load ()
116
129
}
117
130
131
+ // Stop stops any background measurements.
132
+ func (lm * LatencyMeasurer ) Stop () {
133
+ if lm .asyncTick == nil {
134
+ return
135
+ }
136
+ lm .asyncTick .Stop ()
137
+ lm .stop <- struct {}{}
138
+ }
139
+
118
140
func (lm * LatencyMeasurer ) latencyChannelName () string {
119
141
return fmt .Sprintf ("latency-measure:%s" , lm .channel )
120
142
}
0 commit comments