Skip to content

Commit

Permalink
(improvement) add maxSegmentAge in healthreporter (#547)
Browse files Browse the repository at this point in the history
adding a maxSegmentAge field in healthreporter so that it can be tracked in the logs
  • Loading branch information
vpatelsj authored Jan 29, 2025
1 parent 40601a1 commit a3087fd
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 3 deletions.
8 changes: 7 additions & 1 deletion ingestor/cluster/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type Batcher interface {

Release(batch *Batch)
Remove(batch *Batch) error
MaxSegmentAge() time.Duration
}

// Batcher manages WAL segments that are ready for upload to kusto or that need
Expand Down Expand Up @@ -125,6 +126,7 @@ type batcher struct {
maxTransferAge time.Duration
maxTransferSize int64
minUploadSize int64
maxSegmentAge time.Duration

tempSet []wal.SegmentInfo

Expand Down Expand Up @@ -186,6 +188,10 @@ func (b *batcher) SegmentsSize() int64 {
return atomic.LoadInt64(&b.segementsSize)
}

func (b *batcher) MaxSegmentAge() time.Duration {
return b.maxSegmentAge
}

func (b *batcher) watch(ctx context.Context) {
b.wg.Add(1)
defer b.wg.Done()
Expand Down Expand Up @@ -276,7 +282,7 @@ func (b *batcher) processSegments() ([]*Batch, []*Batch, error) {
metrics.IngestorSegmentsMaxAge.WithLabelValues(prefix).Set(time.Since(oldestSegment).Seconds())
metrics.IngestorSegmentsSizeBytes.WithLabelValues(prefix).Set(float64(groupSize))
metrics.IngestorSegmentsTotal.WithLabelValues(prefix).Set(float64(len(b.tempSet)))

b.maxSegmentAge = time.Since(oldestSegment)
groups[prefix] = append(groups[prefix], b.tempSet...)
}

Expand Down
2 changes: 2 additions & 0 deletions ingestor/cluster/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cluster

import (
"context"
"time"

"github.com/Azure/adx-mon/pkg/logger"
)
Expand Down Expand Up @@ -72,3 +73,4 @@ func (f fakeBatcher) SegmentsTotal() int64 { return 0 }
func (f fakeBatcher) SegmentsSize() int64 { return 0 }
func (f fakeBatcher) Release(batch *Batch) {}
func (f fakeBatcher) Remove(batch *Batch) error { return nil }
func (f fakeBatcher) MaxSegmentAge() time.Duration { return 0 }
5 changes: 5 additions & 0 deletions ingestor/cluster/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type QueueSizer interface {
UploadQueueSize() int
SegmentsTotal() int64
SegmentsSize() int64
MaxSegmentAge() time.Duration
}

func NewHealth(opts HealthOpts) *Health {
Expand Down Expand Up @@ -158,3 +159,7 @@ func (h *Health) SegmentsTotal() int64 {
func (h *Health) SegmentsSize() int64 {
return h.QueueSizer.SegmentsSize()
}

func (h *Health) MaxSegmentAge() time.Duration {
return h.QueueSizer.MaxSegmentAge()
}
4 changes: 4 additions & 0 deletions ingestor/cluster/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ type fakeQueueSizer struct {
segmentsTotal int64
}

func (f fakeQueueSizer) MaxSegmentAge() time.Duration {
return 0
}

func (f fakeQueueSizer) TransferQueueSize() int { return f.transferQueueSize }
func (f fakeQueueSizer) UploadQueueSize() int { return f.uploadQueueSize }
func (f fakeQueueSizer) SegmentsSize() int64 { return f.segmentsSize }
Expand Down
6 changes: 4 additions & 2 deletions metrics/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type HealthReporter interface {
SegmentsTotal() int64
SegmentsSize() int64
UnhealthyReason() string
MaxSegmentAge() time.Duration
}

type TimeSeriesWriter interface {
Expand Down Expand Up @@ -142,7 +143,7 @@ func (s *service) collect(ctx context.Context) {

logger.Infof("Status IngestionSamplesPerSecond=%0.2f SamplesIngested=%d IsHealthy=%v "+
"UploadQueueSize=%d TransferQueueSize=%d SegmentsTotal=%d SegmentsSize=%d UnhealthyReason=%s "+
"ActiveConnections=%d DroppedConnections=%d",
"ActiveConnections=%d DroppedConnections=%d MaxSegmentAgeSeconds=%0.2f",
(currentTotal-lastCount)/60, uint64(currentTotal),
s.health.IsHealthy(),
s.health.UploadQueueSize(),
Expand All @@ -151,7 +152,8 @@ func (s *service) collect(ctx context.Context) {
s.health.SegmentsSize(),
s.health.UnhealthyReason(),
int(activeConns),
int(droppedConns))
int(droppedConns),
s.health.MaxSegmentAge().Seconds())

lastCount = currentTotal

Expand Down

0 comments on commit a3087fd

Please sign in to comment.