@@ -44,12 +44,15 @@ const (
4444
4545// HttpForwarderHandlerV2 is a PipelineHandler which sends metrics to another gostatsd instance
4646type HttpForwarderHandlerV2 struct {
47- postId uint64 // atomic - used for an id in logs
48- messagesInvalid uint64 // atomic - messages which failed to be created
49- messagesCreated uint64 // atomic - messages which were created
50- messagesSent uint64 // atomic - messages successfully sent
51- messagesRetried uint64 // atomic - retries (first send is not a retry, final failure is not a retry)
52- messagesDropped uint64 // atomic - final failure
47+ postId uint64 // atomic - used for an id in logs
48+ messagesInvalid uint64 // atomic - messages which failed to be created
49+ messagesCreated uint64 // atomic - messages which were created
50+ messagesSent uint64 // atomic - messages successfully sent
51+ messagesRetried uint64 // atomic - retries (first send is not a retry, final failure is not a retry)
52+ messagesDropped uint64 // atomic - final failure
53+ postLatencyTotal atomic.Int64 // total of the time taken to send messages in a flush interval
54+ postLatencyMax atomic.Int64 // maximum time taken to send a message in a flush interval
55+
5356 lastSuccessfulSend atomic.Int64
5457
5558 logger logrus.FieldLogger
@@ -299,6 +302,11 @@ func (hfh *HttpForwarderHandlerV2) emitMetrics(statser stats.Statser) {
299302 statser .Report ("http.forwarder.sent" , & hfh .messagesSent , nil )
300303 statser .Report ("http.forwarder.retried" , & hfh .messagesRetried , nil )
301304 statser .Report ("http.forwarder.dropped" , & hfh .messagesDropped , nil )
305+
306+ postLatencyMax := hfh .postLatencyMax .Swap (0 )
307+ postLatencyTotal := hfh .postLatencyTotal .Swap (0 )
308+ statser .Gauge ("http.forwarder.post_latency.max" , float64 (postLatencyMax ), nil )
309+ statser .Count ("http.forwarder.post_latency.sum" , float64 (postLatencyTotal ), nil )
302310}
303311
304312// sendNop sends an empty metric map downstream. It's used to "prime the pump" for the deepcheck.
@@ -465,9 +473,18 @@ func (hfh *HttpForwarderHandlerV2) post(ctx context.Context, message proto.Messa
465473 b .MaxElapsedTime = hfh .maxRequestElapsedTime
466474
467475 for {
476+ startTime := clock .Now (ctx )
468477 if err = post (); err == nil {
469478 atomic .AddUint64 (& hfh .messagesSent , 1 )
470479 hfh .lastSuccessfulSend .Store (clock .Now (ctx ).UnixNano ())
480+
481+ postLatency := clock .Since (ctx , startTime ).Milliseconds ()
482+ hfh .postLatencyTotal .Add (postLatency )
483+ for old := hfh .postLatencyMax .Load (); old < postLatency ; old = hfh .postLatencyMax .Load () {
484+ if hfh .postLatencyMax .CompareAndSwap (old , postLatency ) {
485+ break
486+ }
487+ }
471488 return
472489 }
473490
0 commit comments