@@ -20,8 +20,11 @@ type LatencyMeasurer struct {
20
20
channel uuid.UUID
21
21
logger slog.Logger
22
22
23
+ // background measurement members
23
24
collections atomic.Int64
24
25
last atomic.Value
26
+ asyncTick * time.Ticker
27
+ stop chan struct {}
25
28
}
26
29
27
30
type LatencyMeasurement struct {
@@ -36,6 +39,7 @@ func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer {
36
39
return & LatencyMeasurer {
37
40
channel : uuid .New (),
38
41
logger : logger ,
42
+ stop : make (chan struct {}, 1 ),
39
43
}
40
44
}
41
45
@@ -47,6 +51,7 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasure
47
51
)
48
52
49
53
msg := []byte (uuid .New ().String ())
54
+ lm .logger .Debug (ctx , "performing measurement" , slog .F ("msg" , msg ))
50
55
51
56
cancel , err := p .Subscribe (lm .latencyChannelName (), func (ctx context.Context , in []byte ) {
52
57
if ! bytes .Equal (in , msg ) {
@@ -81,23 +86,32 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasure
81
86
// MeasureAsync runs latency measurements asynchronously on a given interval.
82
87
// This function is expected to be run in a goroutine and will exit when the context is canceled.
83
88
func (lm * LatencyMeasurer ) MeasureAsync (ctx context.Context , p Pubsub , interval time.Duration ) {
84
- tick := time .NewTicker (interval )
85
- defer tick .Stop ()
86
-
87
- for ; true ; <- tick .C { // tick immediately
88
- select {
89
- case <- ctx .Done ():
89
+ lm .asyncTick = time .NewTicker (interval )
90
+ defer lm .asyncTick .Stop ()
91
+
92
+ loop:
93
+ for {
94
+ // run immediately on first call, then sleep a tick before each invocation
95
+ if p == nil {
96
+ lm .logger .Error (ctx , "given pubsub is nil" )
90
97
return
91
- default :
92
- if p == nil {
93
- lm .logger .Error (ctx , "given pubsub is nil" )
94
- return
95
- }
96
98
}
97
99
98
100
lm .collections .Add (1 )
99
101
measure := lm .Measure (ctx , p )
100
102
lm .last .Store (& measure )
103
+
104
+ select {
105
+ case <- lm .asyncTick .C :
106
+ continue
107
+
108
+ // bail out if signaled
109
+ case <- lm .stop :
110
+ break loop
111
+ case <- ctx .Done ():
112
+ lm .logger .Debug (ctx , "async measurement context canceled" , slog .Error (ctx .Err ()))
113
+ break loop
114
+ }
101
115
}
102
116
}
103
117
@@ -115,6 +129,15 @@ func (lm *LatencyMeasurer) MeasurementCount() int64 {
115
129
return lm .collections .Load ()
116
130
}
117
131
132
+ // Stop stops any background measurements.
133
+ func (lm * LatencyMeasurer ) Stop () {
134
+ if lm .asyncTick == nil {
135
+ return
136
+ }
137
+ lm .asyncTick .Stop ()
138
+ lm .stop <- struct {}{}
139
+ }
140
+
118
141
func (lm * LatencyMeasurer ) latencyChannelName () string {
119
142
return fmt .Sprintf ("latency-measure:%s" , lm .channel )
120
143
}
0 commit comments